package com.mygame.xinyue.kafka;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import com.mygame.common.utils.TopicUtil;
import com.mygame.game.GameMessageService;
import com.mygame.game.bus.GameMessageInnerDecoder;
import com.mygame.game.common.GameMessageHeader;
import com.mygame.game.common.GameMessagePackage;
import com.mygame.game.common.IGameMessage;
import com.mygame.game.message.xinyue.EnterGameMsgRequest;
import com.mygame.game.message.xinyue.EnterGameMsgResponse;
import com.mygame.gateway.message.context.ServerConfig;

/**
 * 此类主要用于学习和测试
 *
 * @ClassName: ReceiverGameMessageRequestService
 * @author: wgs
 * @date: 2019年5月29日 上午9:55:13
 */
//@Service  //测试的情况下打开
public class ReceiverGameMessageRequestService {
    private Logger logger = LoggerFactory.getLogger(ReceiverGameMessageRequestService.class);
    @Autowired
    private ServerConfig serverConfig;
    @Autowired
    private GameMessageService gameMessageService;
    @Autowired
    private KafkaTemplate<String, byte[]> kafkaTemplate;

    @KafkaListener(topics = {"${game.server.config.business-game-message-topic}"}, groupId = "${game.server.config.server-id}")
    public void consume(ConsumerRecord<String, byte[]> record) {
        GameMessagePackage gameMessagePackage = GameMessageInnerDecoder.readGameMessagePackage(record.value());
        logger.debug("接收收网关消息：{}", gameMessagePackage.getHeader());
        GameMessageHeader header = gameMessagePackage.getHeader();
        if (serverConfig.getServerId() == header.getToServerId()) {
            //如果此条消息的目标是这台服务器，则处理这条消息
            IGameMessage gameMessage = gameMessageService.getRequestInstanceByMessageId(header.getMessageId());
            if (gameMessage instanceof EnterGameMsgRequest) {
                EnterGameMsgResponse response = new EnterGameMsgResponse();//给客户端返回消息，测试
                GameMessageHeader responseHeader = this.createResponseGameMessageHeader(header);
                response.setHeader(responseHeader);
                response.getBodyObj().setNickname("天地无极");
                response.getBodyObj().setPlayerId(header.getPlayerId());
                GameMessagePackage gameMessagePackage2 = new GameMessagePackage();
                gameMessagePackage2.setHeader(responseHeader);
                gameMessagePackage2.setBody(response.body());
                //动态创建游戏网关监听消息的topic
                String topic = TopicUtil.generateTopic(serverConfig.getGatewayGameMessageTopic(), header.getFromServerId());
                byte[] value = GameMessageInnerDecoder.sendMessage(gameMessagePackage2);
                ProducerRecord<String, byte[]> responseRecord = new ProducerRecord<String, byte[]>(topic, String.valueOf(header.getPlayerId()), value);
                kafkaTemplate.send(responseRecord);
            }
        }
    }

    /**
     * 根据请求的包头，创建响应的包头
     *
     * @param requestGameMessageHeader
     * @return
     */
    private GameMessageHeader createResponseGameMessageHeader(GameMessageHeader requestGameMessageHeader) {
        GameMessageHeader newHeader = new GameMessageHeader();
        newHeader.setClientSendTime(requestGameMessageHeader.getClientSendTime());
        newHeader.setClientSeqId(requestGameMessageHeader.getClientSeqId());
        newHeader.setFromServerId(requestGameMessageHeader.getToServerId());
        newHeader.setMessageId(requestGameMessageHeader.getMessageId());
        newHeader.setPlayerId(requestGameMessageHeader.getPlayerId());
        newHeader.setServerSendTime(System.currentTimeMillis());
        newHeader.setServiceId(requestGameMessageHeader.getServiceId());
        newHeader.setToServerId(requestGameMessageHeader.getFromServerId());
        newHeader.setVersion(requestGameMessageHeader.getVersion());
        return newHeader;
    }
}
